package hello_broker

import (
	"context"
	"fmt"
	"github.com/IBM/sarama"
	"github.com/go-kratos/kratos/v2/log"
	segmentioKafka "github.com/segmentio/kafka-go"
	"github.com/tx7do/kratos-transport/broker"
	"github.com/tx7do/kratos-transport/broker/kafka"
	"github.com/tx7do/kratos-transport/broker/nsq"
	"github.com/tx7do/kratos-transport/broker/rocketmq"
	"kratos_kafka/internal/conf"
	"time"
)

type Broker struct {
	conf            *conf.Bootstrap
	logger          *log.Helper
	Kafka           broker.Broker
	Nsq             broker.Broker
	SegKafkaMyTopic *segmentioKafka.Writer
	// sarama-kafka SyncProducer
	// TODO SyncProducer 与 AsyncProducer ？？
	SaramaSyncProducer sarama.SyncProducer

	//RocketMq        broker.Broker
}

func NewBroker(conf *conf.Bootstrap, logger log.Logger) *Broker {
	return &Broker{
		conf:               conf,
		logger:             log.NewHelper(logger),
		Kafka:              NewBootKafka(conf, logger).Run(),
		Nsq:                NewBootNsq(conf, logger).Run(),
		SegKafkaMyTopic:    NewSegmentIoKafkaMyTopic(conf, logger).Run(),
		SaramaSyncProducer: NewSaramaKafkaProducer(conf, logger).Run(),

		//RocketMq: NewBootRocket(conf, logger).Run(),
	}
}

// SaramaKafka
type SaramaKafka struct {
	conf   *conf.Bootstrap
	logger *log.Helper
}

func NewSaramaKafkaProducer(conf *conf.Bootstrap, logger log.Logger) SaramaKafka {
	return SaramaKafka{
		conf:   conf,
		logger: log.NewHelper(logger),
	}
}

func (s SaramaKafka) Run() sarama.SyncProducer {
	producerConfig := sarama.NewConfig()
	// 生产者配置～～
	producerConfig.Producer.Return.Errors = true
	producerConfig.Producer.Return.Successes = true

	// Notice Partitioner 消息分区的算法～
	producerConfig.Producer.Partitioner = sarama.NewHashPartitioner

	// TODO 这几个配置分别是什么意思？
	//producerConfig.Producer.RequiredAcks = sarama.NoResponse
	//delayProducerConfig.Producer.RequiredAcks = sarama.WaitForLocal
	producerConfig.Producer.RequiredAcks = sarama.WaitForAll

	producerConfig.Producer.MaxMessageBytes = 1000000
	producerConfig.Producer.Timeout = 10 * time.Second
	producerConfig.Producer.Retry.Max = 3
	producerConfig.Producer.Retry.Backoff = 100 * time.Millisecond
	producerConfig.Producer.CompressionLevel = sarama.CompressionLevelDefault

	producer, err := sarama.NewSyncProducer(
		s.conf.DelayQueue.SaramaKafka.Broker.Addr,
		producerConfig,
	)
	if err != nil {
		panic(err)
	}

	// TODO 什么时候 producer.Close()....

	return producer
}

// SegmentIoKafka
type SegmentIoKafka struct {
	conf   *conf.Bootstrap
	logger *log.Helper
}

func NewSegmentIoKafkaMyTopic(conf *conf.Bootstrap, logger log.Logger) *SegmentIoKafka {
	return &SegmentIoKafka{
		conf,
		log.NewHelper(logger),
	}
}

func (s *SegmentIoKafka) Run() *segmentioKafka.Writer {

	writer := &segmentioKafka.Writer{
		// Notice 需要指定topic
		Topic:    s.conf.MessageQueue.Kafka.MyTopic,
		Addr:     segmentioKafka.TCP(s.conf.MessageQueue.Kafka.Broker.Addr...),
		Balancer: &segmentioKafka.Hash{},
		// Notice 异步发送消息，加上这个参数后 即使下面用for循环不用并发，发消息也是异步的
		Async: true,
		//Transport: sharedTransport, // Notice TLS连接的时候需要
	}

	return writer
}

// kratos-transport kafka
type Kafka struct {
	conf   *conf.Bootstrap
	logger *log.Helper
}

func NewBootKafka(conf *conf.Bootstrap, logger log.Logger) *Kafka {
	return &Kafka{
		conf,
		log.NewHelper(logger),
	}
}

func (b *Kafka) Run() broker.Broker {
	ctx := context.Background()
	kafkaBroker := kafka.NewBroker(
		broker.WithOptionContext(ctx),
		broker.WithAddress(b.conf.MessageQueue.Kafka.Broker.Addr...),
	)
	_ = kafkaBroker.Init()
	if err := kafkaBroker.Connect(); err != nil {
		panic(fmt.Sprintf("Icant connect to hello_broker, skip: %v", err))
	}
	return kafkaBroker
}

// nsq
type Nsq struct {
	conf   *conf.Bootstrap
	logger *log.Helper
}

func NewBootNsq(conf *conf.Bootstrap, logger log.Logger) *Nsq {
	return &Nsq{
		conf,
		log.NewHelper(logger),
	}
}

func (b *Nsq) Run() broker.Broker {
	ctx := context.Background()
	nsqBroker := nsq.NewBroker(
		broker.WithOptionContext(ctx),
		broker.WithAddress(b.conf.MessageQueue.Nsq.Broker.Addr...),
	)
	_ = nsqBroker.Init()
	if err := nsqBroker.Connect(); err != nil {
		b.logger.WithContext(context.Background()).Errorw("reason", "hello_broker nsq connect error", "error", err)
	}
	return nsqBroker
}

// rocketMQ
type Rocket struct {
	conf   *conf.Bootstrap
	logger *log.Helper
}

func NewBootRocket(conf *conf.Bootstrap, logger log.Logger) *Rocket {
	return &Rocket{
		conf,
		log.NewHelper(logger),
	}
}

func (b *Rocket) Run() broker.Broker {
	ctx := context.Background()
	rocketBroker := rocketmq.NewBroker(
		broker.WithOptionContext(ctx),
		// rocketmq的配置
		rocketmq.WithAliyunHttpSupport(),
		rocketmq.WithEnableTrace(),
		// Notice 这些是阿里云 rocketMQ 的参数，在配置文件中
		rocketmq.WithNameServerDomain(b.conf.MessageQueue.Rocket.Broker.EndPoint),
		rocketmq.WithAccessKey(b.conf.MessageQueue.Rocket.Broker.AccessId),
		rocketmq.WithSecretKey(b.conf.MessageQueue.Rocket.Broker.AccessKey),
		rocketmq.WithInstanceName(b.conf.MessageQueue.Rocket.Broker.InstanceId),
	)
	_ = rocketBroker.Init()
	if err := rocketBroker.Connect(); err != nil {
		b.logger.WithContext(context.Background()).Errorw("reason", "hello_broker rocket connect error", "error", err)
	}
	return rocketBroker
}
